home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.4)
-
- '''Test harness for the logging module. Run all tests.
-
- Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
- '''
- import select
- import os
- import sys
- import string
- import struct
- import types
- import cPickle
- import cStringIO
- import socket
- import threading
- import time
- import logging
- import logging.handlers as logging
- import logging.config as logging
- BANNER = '-- %-10s %-6s ---------------------------------------------------\n'
- FINISH_UP = "Finish up, it's closing time. Messages should bear numbers 0 through 24."
- TIMEOUT = 10
- from SocketServer import ThreadingTCPServer, StreamRequestHandler
-
- class LogRecordStreamHandler(StreamRequestHandler):
- '''
- Handler for a streaming logging request. It basically logs the record
- using whatever logging policy is configured locally.
- '''
-
- def handle(self):
- '''
- Handle multiple requests - each expected to be a 4-byte length,
- followed by the LogRecord in pickle format. Logs the record
- according to whatever policy is configured locally.
- '''
- while None:
-
- try:
- chunk = self.connection.recv(4)
- if len(chunk) < 4:
- break
-
- slen = struct.unpack('>L', chunk)[0]
- chunk = self.connection.recv(slen)
- while len(chunk) < slen:
- chunk = chunk + self.connection.recv(slen - len(chunk))
- obj = self.unPickle(chunk)
- record = logging.makeLogRecord(obj)
- self.handleLogRecord(record)
- continue
- raise
- continue
-
-
-
- def unPickle(self, data):
- return cPickle.loads(data)
-
-
- def handleLogRecord(self, record):
- logname = 'logrecv.tcp.' + record.name
- if record.msg == FINISH_UP:
- self.server.abort = 1
-
- record.msg = record.msg + ' (via ' + logname + ')'
- logger = logging.getLogger(logname)
- logger.handle(record)
-
-
- socketDataProcessed = threading.Event()
-
- class LogRecordSocketReceiver(ThreadingTCPServer):
- '''
- A simple-minded TCP socket-based logging receiver suitable for test
- purposes.
- '''
- allow_reuse_address = 1
-
- def __init__(self, host = 'localhost', port = logging.handlers.DEFAULT_TCP_LOGGING_PORT, handler = LogRecordStreamHandler):
- ThreadingTCPServer.__init__(self, (host, port), handler)
- self.abort = 0
- self.timeout = 1
-
-
- def serve_until_stopped(self):
- abort = 0
- while not abort:
- (rd, wr, ex) = select.select([
- self.socket.fileno()], [], [], self.timeout)
- if rd:
- self.handle_request()
-
- abort = self.abort
- socketDataProcessed.set()
-
-
- def process_request(self, request, client_address):
- t = threading.Thread(target = self.finish_request, args = (request, client_address))
- t.start()
-
-
-
- def runTCP(tcpserver):
- tcpserver.serve_until_stopped()
-
- msgcount = 0
-
- def nextmessage():
- global msgcount
- rv = 'Message %d' % msgcount
- msgcount = msgcount + 1
- return rv
-
-
- def test0():
- ERR = logging.getLogger('ERR')
- ERR.setLevel(logging.ERROR)
- INF = logging.getLogger('INF')
- INF.setLevel(logging.INFO)
- INF_ERR = logging.getLogger('INF.ERR')
- INF_ERR.setLevel(logging.ERROR)
- DEB = logging.getLogger('DEB')
- DEB.setLevel(logging.DEBUG)
- INF_UNDEF = logging.getLogger('INF.UNDEF')
- INF_ERR_UNDEF = logging.getLogger('INF.ERR.UNDEF')
- UNDEF = logging.getLogger('UNDEF')
- GRANDCHILD = logging.getLogger('INF.BADPARENT.UNDEF')
- CHILD = logging.getLogger('INF.BADPARENT')
- ERR.log(logging.FATAL, nextmessage())
- ERR.error(nextmessage())
- INF.log(logging.FATAL, nextmessage())
- INF.error(nextmessage())
- INF.warn(nextmessage())
- INF.info(nextmessage())
- INF_UNDEF.log(logging.FATAL, nextmessage())
- INF_UNDEF.error(nextmessage())
- INF_UNDEF.warn(nextmessage())
- INF_UNDEF.info(nextmessage())
- INF_ERR.log(logging.FATAL, nextmessage())
- INF_ERR.error(nextmessage())
- INF_ERR_UNDEF.log(logging.FATAL, nextmessage())
- INF_ERR_UNDEF.error(nextmessage())
- DEB.log(logging.FATAL, nextmessage())
- DEB.error(nextmessage())
- DEB.warn(nextmessage())
- DEB.info(nextmessage())
- DEB.debug(nextmessage())
- UNDEF.log(logging.FATAL, nextmessage())
- UNDEF.error(nextmessage())
- UNDEF.warn(nextmessage())
- UNDEF.info(nextmessage())
- GRANDCHILD.log(logging.FATAL, nextmessage())
- CHILD.log(logging.FATAL, nextmessage())
- ERR.warn(nextmessage())
- ERR.info(nextmessage())
- ERR.debug(nextmessage())
- INF.debug(nextmessage())
- INF_UNDEF.debug(nextmessage())
- INF_ERR.warn(nextmessage())
- INF_ERR.info(nextmessage())
- INF_ERR.debug(nextmessage())
- INF_ERR_UNDEF.warn(nextmessage())
- INF_ERR_UNDEF.info(nextmessage())
- INF_ERR_UNDEF.debug(nextmessage())
- INF.info(FINISH_UP)
-
- SILENT = 10
- TACITURN = 9
- TERSE = 8
- EFFUSIVE = 7
- SOCIABLE = 6
- VERBOSE = 5
- TALKATIVE = 4
- GARRULOUS = 3
- CHATTERBOX = 2
- BORING = 1
- LEVEL_RANGE = range(BORING, SILENT + 1)
- my_logging_levels = {
- SILENT: 'Silent',
- TACITURN: 'Taciturn',
- TERSE: 'Terse',
- EFFUSIVE: 'Effusive',
- SOCIABLE: 'Sociable',
- VERBOSE: 'Verbose',
- TALKATIVE: 'Talkative',
- GARRULOUS: 'Garrulous',
- CHATTERBOX: 'Chatterbox',
- BORING: 'Boring' }
-
- class SpecificLevelFilter(logging.Filter):
-
- def __init__(self, lvl):
- self.level = lvl
-
-
- def filter(self, record):
- return self.level != record.levelno
-
-
-
- class GarrulousFilter(SpecificLevelFilter):
-
- def __init__(self):
- SpecificLevelFilter.__init__(self, GARRULOUS)
-
-
-
- class VerySpecificFilter(logging.Filter):
-
- def filter(self, record):
- return record.levelno not in [
- SOCIABLE,
- TACITURN]
-
-
-
- def message(s):
- sys.stdout.write('%s\n' % s)
-
- SHOULD1 = "This should only be seen at the '%s' logging level (or lower)"
-
- def test1():
- for lvl in my_logging_levels.keys():
- logging.addLevelName(lvl, my_logging_levels[lvl])
-
-
- def doLog(log):
- for lvl in LEVEL_RANGE:
- log.log(lvl, SHOULD1, logging.getLevelName(lvl))
-
-
- log = logging.getLogger('')
- hdlr = log.handlers[0]
- for lvl in LEVEL_RANGE:
- message("-- setting logging level to '%s' -----" % logging.getLevelName(lvl))
- log.setLevel(lvl)
- doLog(log)
-
- hdlr.setLevel(SOCIABLE)
- message('-- Filtering at handler level to SOCIABLE --')
- for lvl in LEVEL_RANGE:
- message("-- setting logging level to '%s' -----" % logging.getLevelName(lvl))
- log.setLevel(lvl)
- doLog(log)
-
- hdlr.setLevel(0)
- garr = GarrulousFilter()
- hdlr.addFilter(garr)
- message('-- Filtering using GARRULOUS filter --')
- for lvl in LEVEL_RANGE:
- message("-- setting logging level to '%s' -----" % logging.getLevelName(lvl))
- log.setLevel(lvl)
- doLog(log)
-
- spec = VerySpecificFilter()
- log.addFilter(spec)
- message('-- Filtering using specific filter for SOCIABLE, TACITURN --')
- for lvl in LEVEL_RANGE:
- message("-- setting logging level to '%s' -----" % logging.getLevelName(lvl))
- log.setLevel(lvl)
- doLog(log)
-
- log.removeFilter(spec)
- hdlr.removeFilter(garr)
- logging.addLevelName(logging.DEBUG, 'DEBUG')
-
- MSG = '-- logging %d at INFO, messages should be seen every 10 events --'
-
- def test2():
- logger = logging.getLogger('')
- sh = logger.handlers[0]
- sh.close()
- logger.removeHandler(sh)
- mh = logging.handlers.MemoryHandler(10, logging.WARNING, sh)
- logger.setLevel(logging.DEBUG)
- logger.addHandler(mh)
- message('-- logging at DEBUG, nothing should be seen yet --')
- logger.debug('Debug message')
- message('-- logging at INFO, nothing should be seen yet --')
- logger.info('Info message')
- message('-- logging at WARNING, 3 messages should be seen --')
- logger.warn('Warn message')
- for i in xrange(102):
- message(MSG % i)
- logger.info('Info index = %d', i)
-
- mh.close()
- logger.removeHandler(mh)
- logger.addHandler(sh)
-
- FILTER = 'a.b'
-
- def doLog3():
- logging.getLogger('a').info('Info 1')
- logging.getLogger('a.b').info('Info 2')
- logging.getLogger('a.c').info('Info 3')
- logging.getLogger('a.b.c').info('Info 4')
- logging.getLogger('a.b.c.d').info('Info 5')
- logging.getLogger('a.bb.c').info('Info 6')
- logging.getLogger('b').info('Info 7')
- logging.getLogger('b.a').info('Info 8')
- logging.getLogger('c.a.b').info('Info 9')
- logging.getLogger('a.bb').info('Info 10')
-
-
- def test3():
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- hand = root.handlers[0]
- message('Unfiltered...')
- doLog3()
- message("Filtered with '%s'..." % FILTER)
- filt = logging.Filter(FILTER)
- hand.addFilter(filt)
- doLog3()
- hand.removeFilter(filt)
-
-
- def banner(nm, typ):
- sep = BANNER % (nm, typ)
- sys.stdout.write(sep)
- sys.stdout.flush()
-
-
- def test_main_inner():
- rootLogger = logging.getLogger('')
- rootLogger.setLevel(logging.DEBUG)
- hdlr = logging.StreamHandler(sys.stdout)
- fmt = logging.Formatter(logging.BASIC_FORMAT)
- hdlr.setFormatter(fmt)
- rootLogger.addHandler(hdlr)
- shdlr = logging.handlers.SocketHandler('localhost', logging.handlers.DEFAULT_TCP_LOGGING_PORT)
- sockOut = cStringIO.StringIO()
- sockLogger = logging.getLogger('logrecv')
- sockLogger.setLevel(logging.DEBUG)
- sockhdlr = logging.StreamHandler(sockOut)
- sockhdlr.setFormatter(logging.Formatter('%(name)s -> %(levelname)s: %(message)s'))
- sockLogger.addHandler(sockhdlr)
- sockLogger.propagate = 0
- threads = []
- tcpserver = LogRecordSocketReceiver()
- threads.append(threading.Thread(target = runTCP, args = (tcpserver,)))
- for thread in threads:
- thread.start()
-
-
- try:
- banner('log_test0', 'begin')
- rootLogger.addHandler(shdlr)
- test0()
- shdlr.close()
- rootLogger.removeHandler(shdlr)
- banner('log_test0', 'end')
- banner('log_test1', 'begin')
- test1()
- banner('log_test1', 'end')
- banner('log_test2', 'begin')
- test2()
- banner('log_test2', 'end')
- banner('log_test3', 'begin')
- test3()
- banner('log_test3', 'end')
- finally:
- socketDataProcessed.wait()
- for thread in threads:
- thread.join()
-
- banner('logrecv output', 'begin')
- sys.stdout.write(sockOut.getvalue())
- sockOut.close()
- sockLogger.removeHandler(sockhdlr)
- sockhdlr.close()
- banner('logrecv output', 'end')
- sys.stdout.flush()
-
- try:
- hdlr.close()
- except:
- pass
-
- rootLogger.removeHandler(hdlr)
-
-
-
- def test_main():
- import locale
-
- try:
- original_locale = locale.setlocale(locale.LC_ALL)
- locale.setlocale(locale.LC_ALL, '')
- except (ValueError, locale.Error):
- original_locale = None
-
-
- try:
- test_main_inner()
- finally:
- if original_locale is not None:
- locale.setlocale(locale.LC_ALL, original_locale)
-
-
-
- if __name__ == '__main__':
- sys.stdout.write('test_logging\n')
- test_main()
-
-